Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Add new CommandRequest - Pipeline #2954

Open
wants to merge 15 commits into
base: main
Choose a base branch
from

Conversation

shohamazon
Copy link
Collaborator

@shohamazon shohamazon commented Jan 15, 2025

This PR introduces a new CommandRequest type: Pipeline. A Pipeline represents a batch of commands that are sent to Valkey for execution, similar to a Transaction. However, unlike a transaction, a pipeline has the following distinguishing characteristics:

  1. Non-Atomic Execution:
    Transactions in Valkey are atomic. Pipelines, in contrast, do not provide such guarantee.
  2. Multi-Node Support:
    Transactions are limited to a single Valkey node because all commands within a transaction must belong to the same slot in cluster mode. Pipelines, however, can span multiple nodes, allowing commands to target different slots or involve multi-node commands (e.g., PING or MSET that span multiple keys in different slots). (Where in Transaction this Multi-Node commands would just route to a single node).

Implementation Details

To support the execution of pipelines in cluster mode, this PR introduces several changes:

1. Pipeline Splitting:

Since a pipeline can include commands targeting different slots, it needs to be split into sub-pipelines, each grouped by the node responsible for the relevant slots.
The process involves mapping each command in the pipeline to its corresponding node(s) based on the cluster's slot allocation. This mapping is handled by routing logic, which determines whether a command targets a single node or multiple nodes.

2. Node Communication:

Once the pipeline is split, each sub-pipeline is sent to its respective node for execution.
For commands that span multiple nodes, the implementation ensures the responses are tracked and aggregated to form a cohesive result.

3. Response Aggregation:

To handle multi-node commands, the responses from each node are stored along with the node's address. This allows for proper aggregation and processing of results, particularly when commands require combining responses (e.g., for commands like MGET).

Summary

This PR introduces the Pipeline request type, enabling non-atomic batch command execution in Glide.

Issue link

This Pull Request is linked to issue (URL): [REPLACE ME]

Checklist

Before submitting the PR make sure the following are checked:

  • This Pull Request is related to one issue.
  • Commit message has a detailed description of what changed and why.
  • Tests are added or updated.
  • CHANGELOG.md and documentation files are updated.
  • Destination branch is correct - main or release
  • Create merge commit if merging release branch into main, squash otherwise.

@ikolomi ikolomi self-requested a review January 16, 2025 10:53
@shohamazon shohamazon added Rust core redis-rs/glide-core matter Core changes Used to label a PR as PR with significant changes that should trigger a full matrix tests. labels Jan 16, 2025
@shohamazon shohamazon marked this pull request as ready for review January 16, 2025 10:55
@shohamazon shohamazon requested a review from a team as a code owner January 16, 2025 10:55
pipeline: &'a redis::Pipeline,
) -> redis::RedisFuture<'a, Value> {
let command_count = pipeline.cmd_iter().count();
let _offset = command_count + 1; //TODO: check
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolve TODO

.push((index, inner_index));
}

async fn routes_pipeline_commands(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

find a better name and make it shorter

pipeline: &crate::Pipeline,
core: Core<C>,
) -> RedisResult<(
HashMap<String, (Pipeline, C, Vec<(usize, Option<usize>)>)>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

encapsulate in struct

match cluster_routing::RoutingInfo::for_routable(cmd) {
Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
| None => {
if pipelines_by_connection.is_empty() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment

} else {
// since the map is not empty, add the command to a random connection within the map.
let mut rng = rand::thread_rng();
let keys: Vec<_> = pipelines_by_connection.keys().cloned().collect();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to addresses

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think about way not to clone the addresses

for (index, routing_info, response_policy) in response_policies {
#[allow(clippy::type_complexity)]
// Safely access `values_and_addresses` for the current index
let response_receivers: Vec<(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use structs for complex types


// Collect final responses
for mut value in values_and_addresses.into_iter() {
assert_eq!(value.len(), 1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont use asserts in prod code

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use index 0 instead of poping and unwrapping

.map_err(|err| (OperationTarget::FanOut, err))?;

// Update `values_and_addresses` for the current index
values_and_addresses[index] = vec![(aggregated_response, "".to_string())];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use index 0 for storing aggregated_response

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets try to use pipeline for both transaction and pipeline, differentiating by is_atomic

@@ -501,6 +501,10 @@ message Transaction {
repeated Command commands = 1;
}

message Pipeline {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets remove Transaction and use Pipeline + is_atomic flag

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add comments there describing things there

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From protocol's view the only difference between pipeline and transaction is 2 extra commands MULTI and EXEC

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also being atomic + multi slots enabled

Copy link
Collaborator

@barshaul barshaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some initial notes (note to self - got up to handle_single_node_route)

)
})
} else {
// Pipeline is not atomic, so we can have commands with different slots.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add documentation to the function and state that for non atomic pipelines it returns none

Comment on lines 1129 to 1131
let addresses: Vec<_> = pipeline_map.keys().cloned().collect();
let random_address = addresses.choose(&mut rng).unwrap();
let context = pipeline_map.get_mut(random_address).unwrap();
Copy link
Collaborator

@barshaul barshaul Jan 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Don't use unwrap in production code. If a bug would cause pipeline_map.keys() to be empty it would crash the whole client. Instead, change this function to return a result and return ClientError if no random address is found.
  2. Coping all addresses is redundant, you can achieve the same with:
    let mut rng = rand::thread_rng();
    if let Some(node_context) = pipeline_map
        .values_mut()
        .choose(&mut rng) {
            node_context.add_command(cmd, index, None);
            Ok(())
    } else {
        // return error
    }

Comment on lines 2207 to 2208
// This function handles commands with routing info of MultiSlot (like MSET or MGET), creates sub-commands for the matching slots and add it to the correct pipeline
async fn handle_multi_slot_routing(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of the function should declare that it's only relevant for pipelines, the current name is misleading

};
if let Some((address, conn)) = conn {
let new_cmd =
crate::cluster_routing::command_for_multi_slot_indices(cmd, indices.iter());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add use crate::cluster_routing::command_for_multi_slot_indices and remove the prefix

}
}

fn determine_internal_routing(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing description, it ain't clear when and why it would be used

Comment on lines 2256 to 2257
// This function handles commands with routing info of SingleNode
async fn handle_single_node_route(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same issue - all of these functions are placed under ClusterConnInner though they are only relevant for pipelines. I'm not sure there is a good reason placing them here. Why not moving all of the pipelines logic into a separate file (e.g. pipeline, pipeline_routing) under the async_cluster folder?

@BoazBD
Copy link
Collaborator

BoazBD commented Jan 21, 2025

Ignore the review request ^, added by accident. 🙂

Copy link
Collaborator

@barshaul barshaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is long lol. Most comments refer to readability of the code, too complex types and redundant 'pub' declarations, please fix in all required places - not only where I commented.
Will continue tomorrow

@@ -2125,7 +2092,7 @@ where
.map_err(|err| (address.into(), err))
}

async fn try_pipeline_request(
pub async fn try_pipeline_request(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why pub? pub refer to user-facing APIs

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see if it can be removed or if pub(crate) is enough

@@ -2180,7 +2233,7 @@ where
}
}

async fn get_connection(
pub async fn get_connection(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same - function shouldn't be pub

@@ -2139,6 +2106,38 @@ where
.map_err(|err| (OperationTarget::Node { address }, err))
}

/// Aggregates responses for multi-node commands and updates the `values_and_addresses` vector.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Aggregates responses for multi-node commands and updates the `values_and_addresses` vector.
/// Aggregates pipeline responses for multi-node commands and updates the `values_and_addresses` vector.

/// - It collects responses and their source node addresses from the corresponding entry in `values_and_addresses`.
/// - Uses the routing information and optional response policy to aggregate the responses into a single result.
///
/// The aggregated result replaces the existing entries in `values_and_addresses` for the given command index.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read the description 3 times and it's still unclear to me what this function does :(
maybe a simple example would help.

/// - It collects responses and their source node addresses from the corresponding entry in `values_and_addresses`.
/// - Uses the routing information and optional response policy to aggregate the responses into a single result.
///
/// The aggregated result replaces the existing entries in `values_and_addresses` for the given command index.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean by "replaces the existing entries in values_and_addresses for the given command index."? do you mean that it sorts the entries in values_and_addresses by the command indices calculated in this function? try to make it clearer

pub async fn execute_pipeline_on_node<C>(
address: String,
node_context: NodePipelineContext<C>,
) -> Result<(Vec<(usize, Option<usize>)>, Vec<Value>, String), (OperationTarget, RedisError)>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create type aliases for the return type: (Vec<(usize, Option)>, Vec, String), it isn't readable

// might produce multiple responses, each from a different node. By storing the responses with their
// respective node addresses, we ensure that we have all the information needed to aggregate the results later.
// This structure is essential for handling scenarios where responses from multiple nodes must be combined.
let mut values_and_addresses = vec![Vec::new(); pipeline.len()];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can make this complex type a bit clearer with type aliases:

// define on top
type NodeResponse = (Value, String); // A response with its source node address.
type PipelineResponses = Vec<Vec<NodeResponse>>; // Outer Vec: pipeline commands, Inner Vec: (response, address).
...

let mut values_and_addresses: PipelineResponses = vec![Vec::new(); pipeline.len()];

and it can also be used elsewhere (e.g. aggregate_pipeline_multi_node_commands)

#[allow(clippy::type_complexity)]
pub async fn collect_pipeline_tasks(
join_set: &mut tokio::task::JoinSet<
Result<(Vec<(usize, Option<usize>)>, Vec<Value>, String), (OperationTarget, RedisError)>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same-type alias

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then remove #[allow(clippy::type_complexity)]

Comment on lines 339 to 340
/// - `Ok(Some((OperationTarget, RedisError)))`: If one or more tasks encountered an error, returns the first error.
/// - `Ok(None)`: If all tasks completed successfully.
Copy link
Collaborator

@barshaul barshaul Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since returning Ok with Some(err) is confusing, you can make it more readable with some enum representing the return values, something like

enum MultiPipelineResult {
    /// All tasks completed successfully.
    AllSuccessful,

    /// Some tasks failed, returning the first encountered error and the associated operation target.
    FirstError {
        target: OperationTarget,
        error: RedisError,
    },
}

Copy link
Collaborator

@barshaul barshaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haven't finished yet - but some comments

let lock = core.conn_lock.read().expect(MUTEX_READ_ERR);
lock.connection_for_route(route)
};
if let Some((address, conn)) = conn {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if conn is None?

node_context.add_command(cmd, index, None);
Ok(())
} else {
Err(RedisError::from((ErrorKind::IoError, "No nodes available")))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it isn't an IO error.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh ok I see now that you did this logic outside to this function. I think it would be more readable if youll add all of this logic into the handle_pipeline_single_node_routing function, then you can skip this redundant error handling by something like:

async fn handle_pipeline_single_node_routing(...) {
     if matches!(routing, InternalSingleNodeRouting::Random) && !pipeline_map.is_empty() {
        let mut rng = rand::thread_rng();
        if let Some(node_context) = pipeline_map.values_mut().choose(&mut rng) {
            node_context.add_command(cmd, index, None);
            return Ok(()); 
        }
    }

    let (address, conn) =
        ClusterConnInner::get_connection(routing, core, Some(Arc::new(cmd.clone())))
            .await
            .map_err(|err| (OperationTarget::NotFound, err))?;

    add_command_to_node_pipeline_map(pipeline_map, address, conn, cmd, index, None);
    Ok(())

// The routing info is to a random node, and we already have sub-pipelines within our pipelines map, so just add it to a random sub-pipeline
add_command_to_random_existing_node(pipeline_map, cmd, index)
.map_err(|err| (OperationTarget::NotFound, err))?;
Ok(())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can remove this line and line 156 and return a single Ok(()) at the end of the function


// Collect final responses
for mut value in pipeline_responses.into_iter() {
// unwrap() is safe here because we know that the vector is not empty
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't safe and we can't assume it, check that it isn't empty and raise an exception if it does

/// - An optional response policy that dictates how the responses should be aggregated.
///
/// # Returns
/// * `Result<(), (OperationTarget, RedisError)>` - Returns `Ok(())` if the aggregation is successful, or an error tuple containing the operation target and the Redis error if it fails.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: when you hardcode types in the documentation it creates another place we'll have to remember to maintain if we'll change it.

/// aggregate_pipeline_multi_node_commands(&mut pipeline_responses, response_policies).await.unwrap();
///
/// // After aggregation, pipeline_responses will be updated with aggregated results
/// assert_eq!(pipeline_responses[0], vec![(Value::Int(6), "".to_string())]);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"".to_string()?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does it return an empty string instead of some representing enum/None

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you still need this field at this stage? can't you completely filter it out?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cant filter it out since I am only going through the commands in the pipeline with multi-node routing, the pipeline_responses still contains the rest of the commands (with single node routing) that are with the type of (response, address)

let mut response_policies = Vec::new();

for (index, cmd) in pipeline.cmd_iter().enumerate() {
match cluster_routing::RoutingInfo::for_routable(cmd).unwrap_or(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: import cluster_routing::RoutingInfo and remove all redundant prefixes

cluster_routing::RoutingInfo::SingleNode(route) => {
handle_pipeline_single_node_routing(
&mut pipelines_by_connection,
cmd.clone(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cloning all commands is expansive, is it necessary?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes 😞

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion - can we wrap the commands with Arc? if it's too complex, please add it as a todo, it can be done in a seperate PR

@shohamazon shohamazon requested a review from barshaul February 9, 2025 13:54
Signed-off-by: Shoham Elias <[email protected]>
Signed-off-by: Shoham Elias <[email protected]>
Signed-off-by: Shoham Elias <[email protected]>
Signed-off-by: Shoham Elias <[email protected]>
Signed-off-by: Shoham Elias <[email protected]>
Signed-off-by: Shoham Elias <[email protected]>
Signed-off-by: Shoham Elias <[email protected]>
Signed-off-by: Shoham Elias <[email protected]>
Signed-off-by: Shoham Elias <[email protected]>
Signed-off-by: Shoham Elias <[email protected]>
Copy link
Collaborator

@barshaul barshaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will continue tomorrow

@@ -415,6 +420,28 @@ impl Client {
.boxed()
}

pub fn send_pipeline<'a>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add description so it would be clear how this function is different from send_transaction

)
.await
if pipeline.is_atomic() || sub_pipeline {
// If the pipeline is atomic (i.e., a transaction) or if the pipeline is already splitted into sub-pipelines, we can send it as is, with no need to split it into sub-pipelines.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// If the pipeline is atomic (i.e., a transaction) or if the pipeline is already splitted into sub-pipelines, we can send it as is, with no need to split it into sub-pipelines.
// If the pipeline is atomic (i.e., a transaction) or if the pipeline is already splitted into sub-pipelines (i.e., the pipeline is already routed to a specific node), we can send it as is, with no need to split it into sub-pipelines.

.await
} else {
// The pipeline is not atomic and not already splitted, we need to split it into sub-pipelines and send them separately.
Self::handle_pipeline_request(&pipeline, core).await
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle_pipeline_request -> handle_multi_slot_pipeline_request / handle_non_atomic_pipeline or something like that.
now the code is confusing -

if atomic {
try_pipeline_request ()
} else {
handle_pipeline_request()
}

// A command can have one or more responses (e.g MultiNode commands).
// Each entry in `PipelineResponses` corresponds to a command in the original pipeline and contains
// a vector of tuples where each tuple holds a response to the command and the address of the node that provided it.
let mut pipeline_responses: PipelineResponses = vec![Vec::new(); pipeline.len()];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the pipeline.len == 1, aren't we better off by using the try_pipeline_request function without all of this additional logic?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not exactly, if pipelines_by_node ==1 we can avoide all this logic, but pipeline.len == 1 can still contain a multi-node command

cluster_routing::RoutingInfo::SingleNode(route) => {
handle_pipeline_single_node_routing(
&mut pipelines_by_connection,
cmd.clone(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

return Err((
OperationTarget::NotFound,
RedisError::from((
ErrorKind::AllConnectionsUnavailable, // should use different kind? ConnectionNotFoundForRoute
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that if we don't have any primary connections it is ok to set the client with AllConnectionsUnavailable

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just make the error message clearer if it's due to all connections or only primary ones

}

let (address, conn) =
ClusterConnInner::get_connection(routing, core, Some(Arc::new(cmd.clone())))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're cloning the command here again :(

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it was wrapped with Arc to begin with we could have avoid this clone

cluster_routing::RoutingInfo::SingleNode(route) => {
handle_pipeline_single_node_routing(
&mut pipelines_by_connection,
cmd.clone(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion - can we wrap the commands with Arc? if it's too complex, please add it as a todo, it can be done in a seperate PR

Signed-off-by: Shoham Elias <[email protected]>
// Processes the sub-pipelines to generate pending requests for execution on specific nodes.
// Each pending request encapsulates all the necessary details for executing commands on a node.
//
// Returns:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need to document the collect_pipeline_requests function here - you already do it in the function deceleration

///
/// # Parameters
///
/// - `pipeline_responses`: A vec that holds the original pipeline commands responses.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't hold the responses yet, isn't it? this function should store the responses in it. I think it would be more readable if this function would create and return the PipelineResponses

Ok(Err(err)) => {
return Err((OperationTarget::Node { address }, err));
}
_ => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so it seems like if one of the sub-pipelines returns a RecvError you're returning an error for the entire pipeline. Is it the desired behavior? if it does - you could return the error immediately from collect_and_send_pending_requests, so it would return Vec<RedisResult<Response>>, instead of Vec<Result<RedisResult<Response>, RecvError>>,, it would make the code more readable. but you know better re. the next step of error handling, so do what you think is best

.iter()
.map(|(value, address)| {
let (sender, receiver) = oneshot::channel();
let _ = sender.send(Ok(Response::Single(value.clone())));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove clone

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you try changing aggregate_results function to accept vec of (result, address) instead of receivers

.await
.map_err(|err| (OperationTarget::FanOut, err))?;

pipeline_responses[index] = vec![(aggregated_response, "".to_string())];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we talked - lets try to have a single step of iterating through the PipelineResponses once, responses without response policy would simply be inserted to a new results vec (removing their node address), and multi-node responses will be aggregated and then their address can be removed instead of storing "".to_string

///
/// This function distributes the commands in the pipeline across the cluster nodes based on routing information, collects the responses,
/// and aggregates them if necessary according to the specified response policies.
async fn handle_pipeline_request(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to something like handle_non_atomic_pipeline_request

Ok(Err(err)) => {
return Err((OperationTarget::Node { address }, err));
}
_ => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of:_ do: Err(err) to avoid catching other cases

address.clone(),
);
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please also take care of case Ok(Ok(Response:Single), return ClientError

where
C: Clone,
{
// inner_index is used to keep track of the index of the sub-command inside cmd
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the index of the sub-commands in the multi slot routing info vector

Some(inner_index) => {
// Ensure the vector at the given index is large enough to hold the value and address at the specified position
if pipeline_responses[index].len() <= inner_index {
pipeline_responses[index].resize(inner_index + 1, (Value::Nil, "".to_string()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add todo - change the pipeline_responses to hold in [index] a vector already sized with the expected responses len

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on the next PR change Value::Nil to error

}
pipeline_responses[index][inner_index] = (value, address);
}
None => pipeline_responses[index].push((value, address)),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change pipeline_responses[index] to get() and return error if index isn't found

@@ -388,6 +389,18 @@ async fn send_transaction(
.map_err(|err| err.into())
}

async fn send_pipeline(request: Pipeline, client: &mut Client) -> ClientUsageResult<Value> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge with send_transaction

fn test_send_pipeline_and_get_array_of_results(
#[values(RedisType::Cluster, RedisType::Standalone)] use_cluster: RedisType,
) {
let test_basics = setup_test_basics(Tls::NoTls, TestServer::Shared, use_cluster);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add test for read from replica

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you create a list of all test cases you're checking

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Core changes Used to label a PR as PR with significant changes that should trigger a full matrix tests. Rust core redis-rs/glide-core matter
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants